<!-- should always link the the latest release's documentation -->
<!--#include virtual="../../10/streams/quickstart.html" -->
<!--
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
 The ASF licenses this file to You under the Apache License, Version 2.0
 (the "License"); you may not use this file except in compliance with
 the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-->
<script>
  /*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Define variables for doc templates
var context={
    "version": "10",
    "dotVersion": "1.0",
    "fullDotVersion": "1.0.0",
    "scalaVersion": "2.11"
};

<!--#include virtual="../js/templateData.js" --></script>

<script id="content-template" type="text/x-handlebars-template">

<h1>运行 Streams 的 Demo 程序</h1>
<div class="sub-nav-sticky">
        <div class="sticky-top">
            <div style="height:35px">
                <a href="{{version}}/documentation/streams/">简介</a>
                <a href="{{version}}/documentation/streams/developer-guide.html">开发者指南</a>
                <a href="{{version}}/documentation/streams/core-concepts.html">核心思想</a>
                <a class="active-menu-item" href="{{version}}/documentation/streams/quickstart.html">运行 demo 程序</a>
                <a href="{{version}}/documentation/streams/tutorial.html">编写自己的流处理程序</a>
            </div>
        </div>
</div>  
<p>
  本教程假定您是新手并且没有现成的 Kafka 或 ZooKeeper 数据。 但是，如果您已经启动了Kafka和ZooKeeper，请随时跳过前两个步骤。
</p>

  <p>
  Kafka Streams 是用于构建关键任务的实时应用程序和微服务的客户端库，输入或输出数据存储在Kafka集群中。 Kafka Streams 结合了在客户端开发和部署标准Java和Scala应用程序的简易性，以及通过 Kafka 服务器端集群技术的优势，使这些应用程序具有高度可伸缩性，弹性，容错性，分布式等特性。
  </p>
  <p>这个简易示例将演示如何在这个库中运行一个流应用程序。以下是 <code><a href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java">WordCountDemo</a></code> 的示例代码（转换为使用Java 8 lambda表达式以便于阅读）。
  </p>
<pre class="brush: java;">
// Serializers/deserializers (serde) for String and Long types
final Serde&lt;String&gt; stringSerde = Serdes.String();
final Serde&lt;Long&gt; longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream&lt;String, String&gt; textLines = builder.stream("streams-plaintext-input",
    Consumed.with(stringSerde, stringSerde);

KTable&lt;String, Long&gt; wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count()

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
</pre>

<p>
    以上代码实现了WordCount算法，它根据输入文本计算一个单词出现频次的直方图。
    但是，这个程序与您之前可能看到过的对有界数据进行操作的其他WordCount程序不同，这个WordCount演示应用程序的行为稍有不同，因为它是
    设计用于操作<b>无穷、无界的流数据</b>。 类似于有界变体，它是一个可以跟踪和更新单词的数量的有状态的算法。
    但是，因为它必须假设输入数据是无界的，因此它将定期输出当前状态和结果，同时继续处理更多数据，因为它无法知道它何时能处理完“全部”输入数据。
</p>
<p>
  第一步，我们将启动 Kafka (除非你已经启动)，然后我们将为 Kafka topic 准备输入数据，随后将由 Kafka 流应用程序处理流数据。
</p>

<h4><a id="quickstart_streams_download" href="#quickstart_streams_download">步骤1： 下载源代码</a></h4>

<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/{{fullDotVersion}}/kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz" title="Kafka downloads">下载</a> {{fullDotVersion}} release版本并解压。
注意有多个可下载的Scala版本程序，我们选择推荐的版本 ({{scalaVersion}}) :

<pre class="brush: bash;">
&gt; tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
&gt; cd kafka_{{scalaVersion}}-{{fullDotVersion}}
</pre>

<h4><a id="quickstart_streams_startserver" href="#quickstart_streams_startserver">步骤2： 启动 Kafka 服务</a></h4>

<p>
Kafka 使用到 <a href="https://zookeeper.apache.org/">ZooKeeper</a> ，如果还没安装 ZooKeeper 的话需要先安装并启动 ZooKeeper 服务。 可以使用 kafka 打包的便捷脚本获取一个简单粗糙的单节点 ZooKeeper 实例。
</p>

<pre class="brush: bash;">
&gt; bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
</pre>

<p>现在可以启动 Kafka 服务：</p>
<pre class="brush: bash;">
&gt; bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
</pre>


<h4><a id="quickstart_streams_prepare" href="#quickstart_streams_prepare">步骤3：准备 input topic 和启动 Kafka producer</a></h4>

<!--

<pre class="brush: bash;">
&gt; echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
</pre>
Or on Windows:
<pre class="brush: bash;">
&gt; echo all streams lead to kafka> file-input.txt
&gt; echo hello kafka streams>> file-input.txt
&gt; echo|set /p=join kafka summit>> file-input.txt
</pre>

-->

接下来，我们创建名字为 <b>streams-plaintext-input</b> 的 input topic 以及名字为 <b>streams-wordcount-output</b> 的 output topic：

<pre class="brush: bash;">
&gt; bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".
</pre>

注意：因为 output stream 是changlog stream，因此我们创建 output stream 的时候启动了压缩。
Note: we create the output topic with compaction enabled because the output stream is a changelog stream
(参考 <a href="#anchor-changelog-output">explanation of application output</a> ).

<pre class="brush: bash;">
&gt; bin/kafka-topics.sh --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".
</pre>

已创建的 topic 可以使用同样的 <b>kafka-topics</b> 工具来查询描述信息：

<pre class="brush: bash;">
&gt; bin/kafka-topics.sh --zookeeper localhost:2181 --describe

Topic:streams-plaintext-input	PartitionCount:1	ReplicationFactor:1	Configs:
    Topic: streams-plaintext-input	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:streams-wordcount-output	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: streams-wordcount-output	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
</pre>

<h4><a id="quickstart_streams_start" href="#quickstart_streams_start">步骤4： 启动 Wordcount 程序</a></h4>

以下命令将启动 WordCount 示例程序：

<pre class="brush: bash;">
&gt; bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
</pre>

<p>
    示例程序会从 input topic <b>streams-plaintext-input</b> 读取数据，以 WordCount 算法计算每条读到的消息，
    并持续将当前的计算结果写到 output topic <b>streams-wordcount-output</b>。
    因此，除了日志之外不会哟其他输出结果打印到STDOUT标准输出流，因为计算结构都回写到 Kafka 中。
</p>

现在我们可以在另外一个终端启动 Producer 会话来为这个 topic 写入数据：

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
</pre>

然后可以在另外一个终端启动 consumer 会话从 output topic 中读取并检查 WordCount 示例程序的输出：

<pre class="brush: bash;">
&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
</pre>


<h4><a id="quickstart_streams_process" href="#quickstart_streams_process">步骤5：处理数据</a></h4>

Now let's write some message with the console producer into the input topic <b>streams-plaintext-input</b> by entering a single line of text and then hit &lt;RETURN&gt;.
This will send a new message to the input topic, where the message key is null and the message value is the string encoded text line that you just entered
(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):

现在我们可以在 producer 控制台通过输入单行文本并回车的方式输入消息到 input topic <b>streams-plaintext-input</b>。
这样会输入一条新的消息到 input topic，这条消息的key值是null，value值是刚刚输入的文本（实践中，应用程序的输入数据通常会不断地流进 Kafka，而不是像我们在这样子手动输入）：


<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
</pre>

<p>
该消息将被 Wordcount 程序处理而且以下结果将写入到 <b>streams-wordcount-output</b> topic 并打印在consumer控制台：
</p>

<pre class="brush: bash;">
&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all	    1
streams	1
lead	1
to	    1
kafka	1
</pre>

<p>
这里的第一个字段是 Kafka 消息的key值，格式是 <code>java.lang.String</code>，代表一个被统计到的单词，第二个字段是消息的value值，格式是 <code>java.lang.Long</code>，代表单词的最新统计次数。
</p>

接着我们继续在 producer 会话写入消息到input topic <b>streams-plaintext-input</b>。
输入文本"hello kafka streams"并回车。
终端应该如下：


<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
</pre>

在另外一个运行着consumer的终端上，将观察到 WordCount 程序的如下输出数据：

<pre class="brush: bash;">
&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all	    1
streams	1
lead	1
to	    1
kafka	1
hello	1
kafka	2
streams	2
</pre>

这里显示的最后两行 <b>kafka 2</b> 和 <b>streams 2</b> 表示  <b>kafka</b> 和 <b>streams</b> 这两个key的计数已经更新，从 <b>1</b> 增长到 <b>2</b> 。
只要将更多输入消息写入 input topic ，都可以在 <b> streams-wordcount-output </b> topic 上观察到新消息，它们代表由WordCount应用程序计算出的最新字数。
在圆满完成这个快速示例之前让我们在producer控制台向input topic <b>streams-wordcount-input</b> 输入最后一行文本 "join kafka summit" 并回车：

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input
all streams lead to kafka
hello kafka streams
join kafka summit
</pre>

<a name="anchor-changelog-output"></a>
<b>streams-wordcount-output</b> topic 将随后显示单词的计数更新（查看最后三行）：

<pre class="brush: bash;">
&gt; bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all	    1
streams	1
lead	1
to	    1
kafka	1
hello	1
kafka	2
streams	2
join	1
kafka	3
summit	1
</pre>

可以看出，Wordcount应用程序的输出实际上是连续更新的流，其中每个输出记录（即上面原始输出中的每行）是一个单词的更新计数，诸如“kafka”之类的记录的key值。
对于有相同key值的多条记录，每条新的记录都是前一条记录的更新。

<p>
    下面的两张图说明了后台实际发生的内容。
    第一列显示代表单词出现次数 <code> count </code> 的 <code>KTable&lt;String, Long&gt;</code> 的当前状态的演变。
    第二列显示从KTable的状态更新以及发送到Kafka output topic <b>streams-wordcount-output</b> 的更改记录。
</p>

<img src="images/streams-table-updates-02.png" style="float: right; width: 25%;">
<img src="images/streams-table-updates-01.png" style="float: right; width: 25%;">

<p>
    首先，当第一行文本 "all streams lead to kafka" 开始被处理。
    当每个新单词产生一个新表项（用绿色背景突出显示）时，<code> KTable </code> 也开始创建，并将相应的更改记录发送到下游<code> KStream </code>。
</p>
<p>
    当处理第二行文本“hello kafka streams”时，我们首次观察到<code> KTable </code>中现有的条目更新（这里是："kafka"和"streams"两个key值）。同样地，更改记录也被发送到 output topic 。
</p>
<p>
    处理过程大致如此（我们跳过了第三行如何处理的说明）。 这解释了为什么 output topic 具有我们上面显示的内容，因为它包含完整的更改记录。
</p>

<p>
    除了这个具体例子之外，Kafka Streams 还演示了如何利用 table 和 changelog stream 之间的对偶性（这里：table = KTable，changelog stream =下游KStream）：你可以将table的内容发布并转换为流，并且如果从头到尾使用整个 changelog stream ，则我们可以重新构建整个table的内容。
</p>

<h4><a id="quickstart_streams_stop" href="#quickstart_streams_stop">Step 6: Teardown the application</a></h4>

<p>现在，你可以通过 <b>Ctrl + C</b> 按钮按顺序停止 consumer，producer，Wordcount程序， Kafka broker和ZooKeeper服务。</p>

 <div class="pagination">
        <a href="{{version}}/documentation/streams" class="pagination__btn pagination__btn__prev">Previous</a>
        <a href="{{version}}/documentation/streams/tutorial" class="pagination__btn pagination__btn__next">Next</a>
    </div>
</script>

<div class="p-quickstart-streams"></div>

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<html xmlns:og="http://ogp.me/ns#">

<head>
	<title>Kafka 中文文档 - ApacheCN</title>
	<link rel='stylesheet' href='./css/styles.css' type='text/css'>
	<link rel='stylesheet' href='./css/syntax-highlighting.css' type='text/css'>
	<link rel="icon" type="image/gif" href="./images/apache_feather.gif">
	<meta name="robots" content="index,follow" />
	<meta name="language" content="en" />
	<meta name="keywords" content="apache kafka messaging queuing distributed stream processing">
	<meta name="description" content="Apache Kafka: A Distributed Streaming Platform.">
	<meta http-equiv='Content-Type' content='text/html;charset=utf-8' />
	<meta name="viewport" content="initial-scale = 1.0,maximum-scale = 1.0" />
	<meta property="og:title" content="Apache Kafka" />
	<meta property="og:image" content="http://apache-kafka.org/images/apache-kafka.png" />
	<meta property="og:description" content="Apache Kafka: A Distributed Streaming Platform." />
	<meta property="og:site_name" content="Apache Kafka" />
	<meta property="og:type" content="website" />
	<link href="https://fonts.googleapis.com/css?family=Cutive+Mono|Roboto:400,700,900" rel="stylesheet">
	<script src="./js/jquery.min.js"></script>
	<script async src="./js/apachecn/googletagmanager.js"></script>
	<script>
		window.dataLayer = window.dataLayer || [];
		function gtag() { dataLayer.push(arguments); }
		gtag('js', new Date());

		gtag('config', 'UA-102475051-9');
	</script>

	<script>
		var _hmt = _hmt || [];
		(function () {
			var hm = document.createElement("script");
			hm.src = "https://hm.baidu.com/hm.js?9f2b74b80ab8aafb5970835acf96a0ea";
			var s = document.getElementsByTagName("script")[0];
			s.parentNode.insertBefore(hm, s);
		})();
	</script>

	<script>
		(function () {
			var bp = document.createElement('script');
			var curProtocol = window.location.protocol.split(':')[0];
			if (curProtocol === 'https') {
				bp.src = 'https://zz.bdstatic.com/linksubmit/push.js';
			}
			else {
				bp.src = 'http://push.zhanzhang.baidu.com/push.js';
			}
			var s = document.getElementsByTagName("script")[0];
			s.parentNode.insertBefore(bp, s);
		})();
	</script>
	<script>
		// DO NOT NEED TO UPDATE
		// Legacy versions of the documentation to not do frontend redirect for
		// These docs are written as a single super long file so no need to re-route
		var legacyDocPaths = [
			'./07/documentation',
			'./07/documentation/',
			'./08/documentation',
			'./08/documentation/',
			'./081/documentation',
			'./081/documentation/',
			'./082/documentation',
			'./082/documentation/',
			'./090/documentation',
			'./090/documentation/',
			'./0100/documentation',
			'./0100/documentation/'
		];

		// Any direct request for Streams documentation in docs versions prior to 0101
		// Redirect these requests to the standalone Streams doc page
		var currentPath = window.location.pathname;
		var shouldRedirect = !legacyDocPaths.includes(currentPath);
		var isDocumenationPage = currentPath.includes('/documentation');

		var hasNotSpecifiedFullPath = !currentPath.includes('/documentation/streams') && !currentPath.includes('/documentation/streams/');

		// Look for legacy anchors to clue us in on what full path the user needs
		// Add more as needed
		var specifiedStreamsAnchor = window.location.hash.includes('#streams_');

		if (shouldRedirect && isDocumenationPage && hasNotSpecifiedFullPath) {
			if (specifiedStreamsAnchor) {
				window.location.pathname = currentPath + 'streams';
			}
		}
	</script>
</head>
<!--#include virtual="../../includes/_header.htm" -->
<body>
	<div class="main">
		<div class="header">
			<a href=""><img width="325" height="97" class="logo" src="images/logo.png"></a>
		</div>

<!--#include virtual="../../includes/_top.htm" -->
<div class="content documentation documentation--current">
        <nav class="b-sticky-nav">
  <div class="nav-scroller">
    <div class="nav__inner">
      <a class="b-nav__home nav__item" href="">主页</a>
      <a class="b-nav__intro nav__item" href="intro.html">介绍</a>
      <a class="b-nav__quickstart nav__item" href="quickstart.html">快速开始</a>
      <a class="b-nav__uses nav__item" href="uses.html">使用案例</a>

      <div class="nav__item nav__item__with__subs">
        <a class="b-nav__docs nav__item nav__sub__anchor" href="documentation.html">文档</a>
        <a class="nav__item nav__sub__item" href="documentation.html#gettingStarted">入门</a>
        <a class="nav__item nav__sub__item" href="documentation.html#api">APIs</a>
        <a class="b-nav__streams nav__item nav__sub__item" href="documentation.html#streams">kafka streams</a>
        <a class="nav__item nav__sub__item" href="documentation.html#connect">kafka connect</a>
        <a class="nav__item nav__sub__item" href="documentation.html#configuration">配置</a>
        <a class="nav__item nav__sub__item" href="documentation.html#design">设计</a>
        <a class="nav__item nav__sub__item" href="documentation.html#implementation">实现</a>
        <a class="nav__item nav__sub__item" href="documentation.html#operations">操作</a>
        <a class="nav__item nav__sub__item" href="documentation.html#security">安全</a>
      </div>

      <a class="b-nav__performance nav__item" href="performance.html">性能</a>
      <a class="b-nav__poweredby nav__item" href="powered-by.html">powered by</a>
      <a class="b-nav__project nav__item" href="project.html">项目信息</a>
      <a class="b-nav__ecosystem nav__item" href="https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem" target="_blank">生态圈</a>
      <a class="b-nav__clients nav__item" href="https://cwiki.apache.org/confluence/display/KAFKA/Clients" target="_blank">客户端</a>
      <a class="b-nav__events nav__item" href="events.html">事件</a>
      <a class="b-nav__contact nav__item" href="contact.html">联系我们</a>

      <div class="nav__item nav__item__with__subs">
        <a class="b-nav__apache nav__item nav__sub__anchor b-nav__sub__anchor" href="#">apache</a>
        <a class="b-nav__apache nav__item nav__sub__item" href="http://www.apache.org/" target="_blank">贡献</a>
        <a class="b-nav__apache nav__item nav__sub__item" href="http://www.apache.org/licenses/" target="_blank">license</a>
        <a class="b-nav__apache nav__item nav__sub__item" href="http://www.apache.org/foundation/sponsorship.html" target="_blank">赞助</a>
        <a class="b-nav__apache nav__item nav__sub__item" href="http://www.apache.org/foundation/thanks.html" target="_blank">感谢</a>
        <a class="b-nav__apache nav__item nav__sub__item" href="http://www.apache.org/security/" target="_blank">安全</a>
      </div>

      <a class="btn" href="downloads.html">下载</a>
      <div class="social-links">
        <a class="twitter" href="https://twitter.com/apachekafka" target="_blank">@apachekafka</a>
      </div>
    </div>
  </div>
  <div class="navindicator">
    <div class="b-nav__home navindicator__item"></div>
    <div class="b-nav__intro navindicator__item"></div>
    <div class="b-nav__quickstart navindicator__item"></div>
    <div class="b-nav__uses navindicator__item"></div>
    <div class="b-nav__docs navindicator__item"></div>
    <div class="b-nav__performance navindicator__item"></div>
    <div class="b-nav__poweredby navindicator__item"></div>
    <div class="b-nav__project navindicator__item"></div>
    <div class="b-nav__ecosystem navindicator__item"></div>
    <div class="b-nav__clients navindicator__item"></div>
    <div class="b-nav__events navindicator__item"></div>
    <div class="b-nav__contact navindicator__item"></div>
  </div>
</nav>

        <!--#include virtual="../../includes/_nav.htm" -->
        <div class="right">
                <a class="documentation__banner b-sticky-doc-banner" href="documentation">
  You're viewing documentation for an older version of Kafka - check out our current documentation here.
</a>

            <!--#include virtual="../../includes/_docs_banner.htm" -->
        <ul class="breadcrumbs">
            <li><a href="documentation">Documentation</a></li>
            <li><a href="documentation/streams">Streams</a></li>
        </ul>
        <div class="p-content"></div>
    </div>
</div>
				</div>
			</div>
		</div>
		<div class="footer">
			<div class="footer__inner">
				<div class="footer__legal">
					<span class="footer__legal__one">The contents of this website are &copy; 2016 <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a> under the terms of the <a href="https://www.apache.org/licenses/LICENSE-2.0.html" target="_blank">Apache License v2</a>.</span>
					<span class="footer__legal__two">Apache Kafka, Kafka, and the Kafka logo are either registered trademarks or trademarks of The Apache Software Foundation</span>
					<span class="footer__legal__three">in the United States and other countries.</span>
				</div>
				<a class="apache-feather" target="_blank" href="http://www.apache.org">
					<img width="40" src="images/feather-small.png" alt="Apache Feather">
				</a>
			</div>
		</div>
	</body>

    <script type="text/javascript" src="js/syntaxhighlighter.js"></script>
	<script src="https://cdnjs.cloudflare.com/ajax/libs/handlebars.js/2.0.0/handlebars.js"></script>
	<script>
		$(function () {
			// list of pages that are rendered with Handlebars
			var templates = [
				'introduction',
				'implementation',
				'design',
				'api',
				'configuration',
				'ops',
				'security',
				'connect',
				'streams',
				'quickstart',
				'toc',
				'upgrade',
				'content'
			];

			// loop through all Handlebar templates on the page and render them
			for(var i = 0; i < templates.length; i++) {
				var templateScript = $("#" + templates[i] + "-template").html();
				if(templateScript) {
					var template = Handlebars.compile(templateScript);
					var html = template(context);
					$(".p-" + templates[i]).html(html);
				}
			}
		});
	</script>

	<script src="js/jquery.sticky-kit.min.js"></script>
	<script>
		$(function() {
			// Set mobile scroll position on nav
			function setNavScroll(offsetLeft) {
				$('.nav-scroller').animate({
					scrollLeft: $('.nav-scroller').scrollLeft() + $('nav .selected').offset().left - offsetLeft
				}, 50);
			}

			// Helper classes for nav
			$('nav').mouseenter(function(){
				$(this).addClass('hovering');
			});
			$('nav').mouseleave(function(){
				$(this).removeClass('hovering');
			});

			// Handle expanding sections of nav (async)
			$('.b-nav__sub__anchor').click(function(){
				$('nav .selected').removeClass('selected');
				$('.nav__item__with__subs--expanded').removeClass('nav__item__with__subs--expanded');

				$(this).addClass('selected');
				$(this).parent().toggleClass('nav__item__with__subs--expanded');

				if($(window).width() <= 650) {
					window.setTimeout(function(){
						setNavScroll(30);
					}, 300);
				}
			});

			// Initialize sticky elements on the page
			if($(window).width() > 650) {
				// Nav for desktop
				$('.b-sticky-nav').stick_in_parent({offset_top: 40});
				// Documentation banner for desktop
				$('.b-sticky-doc-banner').stick_in_parent({offset_top: 0});
			}	else {
				// Scroll nav for mobile so current nav item is in view
				window.setTimeout(function(){
					setNavScroll(80);
				}, 300);
			}

			// On window resize check to see if stuff should be unstuck
			window.onresize = function(event) {
			  if($(window).width() <= 650) {
			    $('.b-sticky-nav').trigger("sticky_kit:detach");
			  } else {
			    $('.b-sticky-nav').stick_in_parent({offset_top: 40});
					$('.b-sticky-doc-banner').stick_in_parent({offset_top: 0});
			  }
			};
		});
	</script>


</html>

<!--#include virtual="../../includes/_footer.htm" -->
<script>
$(function() {
           // Show selected style on nav item
          $('.b-nav__streams').addClass('selected');

   
          //sticky secondary nav
          var $navbar = $(".sub-nav-sticky"),
               y_pos = $navbar.offset().top,
               height = $navbar.height();
       
           $(window).scroll(function() {
               var scrollTop = $(window).scrollTop();
           
               if (scrollTop > y_pos - height) {
                   $navbar.addClass("navbar-fixed")
               } else if (scrollTop <= y_pos) {
                   $navbar.removeClass("navbar-fixed")
               }
           });
           // Display docs subnav items
          $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>
